# Copyright (c) HySoP 2011-2024
#
# This file is part of HySoP software.
# See "https://particle_methods.gricad-pages.univ-grenoble-alpes.fr/hysop-doc/"
# for further info.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import inspect
import networkx
from hysop import dprint
from hysop.constants import MemoryOrdering
from hysop.tools.htypes import check_instance, first_not_None
from hysop.tools.decorators import debug, wraps, profile
[docs]
def is_directed_acyclic_graph(graph):
return networkx.algorithms.dag.is_directed_acyclic_graph(graph)
[docs]
def transitive_reduction(graph):
reduced_graph = networkx.algorithms.dag.transitive_reduction(graph)
# copy back edge attributes (node data is automatically transferred
# because nodes are the data (VertexAttributes))
for node in reduced_graph:
for out_node in reduced_graph[node]:
for k, v in graph[node][out_node].items():
reduced_graph[node][out_node][k] = v
return reduced_graph
[docs]
def all_simple_paths(graph, src, dst):
return tuple(networkx.algorithms.simple_paths.all_simple_paths(graph, src, dst))
[docs]
def lexicographical_topological_sort(graph):
# Lexicographical sort ensures a unique permutations of nodes
# such that they are in the same topological order on each
# MPI process. Else operators will not be executed in the same
# order and everything deadlocks on MPI synchronization.
topo_sort = tuple(
networkx.algorithms.dag.lexicographical_topological_sort(
graph, key=lambda x: int(x)
)
)
return topo_sort
[docs]
def new_directed_graph():
return networkx.DiGraph()
[docs]
def new_vertex(graph, *args, **kwds):
# /!\ We have to use networkx 2.2 which has a different interface for attributes
node = VertexAttributes(graph, *args, **kwds)
graph.add_node(node)
return node
[docs]
def new_edge(graph, u, v, *args, **kwds):
# /!\ We have to use networkx 2.2 which has a different interface for attributes
assert u in graph
assert v in graph
if v not in graph[u]:
data = EdgeAttributes(*args, **kwds)
graph.add_edge(u, v, data=data)
else:
edge = graph[u][v]
edge["data"].update(*args, **kwds)
return (u, v)
[docs]
def generate_vertex_colors():
try:
import matplotlib
except ImportError:
return None
c0 = matplotlib.colormaps["tab20c"].colors
c1 = matplotlib.colormaps["tab20b"].colors
colors = []
for i in (2, 3, 0, 1):
colors += c0[i::4] + c1[i::4]
colors = tuple(map(matplotlib.colors.to_hex, colors))
return colors
[docs]
class VertexAttributes:
"""Simple class to hold vertex data."""
colors = generate_vertex_colors()
def __init__(self, graph, operator=None):
if not hasattr(graph, "_hysop_node_counter"):
graph._hysop_node_counter = 0
node_id = graph._hysop_node_counter
graph._hysop_node_counter += 1
self.node_id = node_id
self.operator = operator
self.input_states = None
self.output_states = None
self.op_ordering = None
self.command_queue = None
[docs]
def copy_attributes(self, other):
if other is None:
return self
check_instance(other, VertexAttributes)
for vname in (
"operator",
"input_states",
"output_states",
"op_ordering",
"command_queue",
):
setattr(
self, vname, first_not_None(getattr(self, vname), getattr(other, vname))
)
return self
[docs]
def set_op_info(self, operator, input_states, output_states):
assert self.operator is not None
assert self.operator is operator
self.operator = operator
self.input_states = input_states
self.output_states = output_states
return self
# hashing for networkx
def __hash__(self):
return self.node_id
def __eq__(self, other):
return self.node_id == other.node_id
def __int__(self):
return self.node_id
# pyvis attributes for display
@property
def label(self):
s = f"{self.operator.pretty_name}"
if self.op_ordering is not None:
s = f"({self.op_ordering})\n{s}"
return s
@property
def title(self):
return self.node_info().replace("\n", "<br>")
[docs]
def shape(self, with_custom_nodes=True):
from hysop.operator.base.transpose_operator import TransposeOperatorBase
from hysop.operator.base.redistribute_operator import RedistributeOperatorBase
from hysop.operator.base.memory_reordering import MemoryReorderingBase
special_shapes = {
RedistributeOperatorBase: "box",
TransposeOperatorBase: "box",
MemoryReorderingBase: "box",
}
if with_custom_nodes:
for op_type, shape in special_shapes.items():
if isinstance(self.operator, op_type):
return shape
return "circle"
@property
def color(self):
cq = self.command_queue
if cq is None:
return None
assert isinstance(cq, int) and cq >= 0
colors = self.colors
ncolors = len(colors)
return colors[cq % ncolors]
[docs]
def node_info(self):
op = self.operator
istates = self.input_states
ostates = self.output_states
ifields = op.input_fields
ofields = op.output_fields
iparams = op.input_params
oparams = op.output_params
memorder2str = {
MemoryOrdering.C_CONTIGUOUS: "C",
MemoryOrdering.F_CONTIGUOUS: "F",
}
def ifinfo(field, topo):
info = (field.pretty_name, topo.id)
if istates:
assert field in istates
istate = istates[field]
assert istate is not None
info += (memorder2str[istate.memory_order],)
info += (str(istate.tstate),)
return ", ".join(map(str, info))
def ofinfo(field, topo):
info = (field.pretty_name, topo.id)
if ostates:
assert field in ostates
ostate = ostates[field]
assert ostate is not None
info += (memorder2str[ostate.memory_order],)
info += (str(ostate.tstate),)
return ", ".join(map(str, info))
def ipinfo(param):
return param.pretty_name
def opinfo(param):
return param.pretty_name
prefix = "  <b>"
suffix = "</b>  "
sep = "\n" + " " * 14
ss = "<h2>Operator {}</h2>{}{}{}{}{}\n{}".format(
op.name,
f"{prefix}Rank:{suffix}{self.op_ordering}\n\n" if self.op_ordering else "",
(
"{p}Pin:{s}{}\n".format(
sep.join(ipinfo(param) for param in iparams.keys()),
p=prefix,
s=suffix + "  ",
)
if iparams
else ""
),
(
"{p}Fin:{s}{}\n".format(
sep.join([ifinfo(f, topo) for (f, topo) in ifields.items()]),
p=prefix,
s=suffix + "  ",
)
if ifields
else ""
),
(
"{p}Pout:{s}{}\n".format(
sep.join([opinfo(param) for param in oparams.keys()]),
p=prefix,
s=suffix,
)
if oparams
else ""
),
(
"{p}Fout:{s}{}\n".format(
sep.join([ofinfo(f, topo) for (f, topo) in ofields.items()]),
p=prefix,
s=suffix,
)
if ofields
else ""
),
"{p}Type:{s} {}".format(
sep.join(map(lambda x: x.__name__, type(op).__mro__[:-2])),
p=prefix,
s=suffix,
),
)
return ss
[docs]
class EdgeAttributes:
"""Simple class to hold edge data."""
def __init__(self, *args, **kwds):
self.variables = {}
self.update(*args, **kwds)
[docs]
def update(self, variable=None, topology=None):
if variable is None:
assert topology is None
return
self.variables.setdefault(variable, set()).add(topology)
def __str__(self):
prefix = "  <b>"
suffix = "</b>  "
ss = "<h2>Variable dependencies</h2>{}".format(
"\n".join(
"{p}{}:{s}{}".format(
v.pretty_name,
", ".join(
v.pretty_name if (t is None) else v[t].short_description()
for t in self.variables[v]
),
p=prefix,
s=suffix,
)
for v in self.variables
)
)
return ss.replace("\n", "<br>")
[docs]
class ComputationalGraphNodeData:
"""
Simple class to hold some node data.
"""
def __init__(self, current_level, node_id):
self.current_level = current_level
self.node_id = node_id
self.apply_kargs = [] # list of dictionnary, last one has priority
def __str__(self):
return f"(lvl={self.current_level},id={self.node_id})"
if __debug__:
# python in debug mode, all decorators do check their target attribute
def not_initialized(f):
assert callable(f)
@wraps(f)
def _not_initialized(*args, **kargs):
return f(*args, **kargs)
self = args[0]
msg = "Cannot call {}.{}() on node '{}' because {}".format(
self.__class__.__name__, f.__name__, self.name, "{}"
)
if self.initialized:
reason = "this node has already been initialized."
raise RuntimeError(msg.format(reason))
return _not_initialized
def initialized(f):
assert callable(f)
@wraps(f)
def _initialized(*args, **kargs):
self = args[0]
msg = "Cannot call {}.{}() on node '{}' because {}".format(
self.__class__.__name__, f.__name__, self.name, "{}"
)
if not self.initialized:
reason = "this node has not been initialized yet."
raise RuntimeError(msg.format(reason))
return f(*args, **kargs)
return _initialized
def discretized(f):
assert callable(f)
@wraps(f)
def _discretized(*args, **kargs):
self = args[0]
msg = "Cannot call {}.{}() on node '{}' because {}".format(
self.__class__.__name__, f.__name__, self.name, "{}"
)
if not self.discretized:
reason = "this node has not been discretized yet."
raise RuntimeError(msg.format(reason))
return f(*args, **kargs)
return _discretized
def ready(f):
assert callable(f)
@wraps(f)
def _ready(*args, **kargs):
self = args[0]
msg = "Cannot call {}.{}() on node '{}' because {}".format(
self.__class__.__name__, f.__name__, self.name, "{}"
)
if not self.ready:
reason = "this node has not been set up."
raise RuntimeError(msg.format(reason))
return f(*args, **kargs)
return _ready
def graph_built(f):
assert callable(f)
@wraps(f)
def _graph_built(*args, **kargs):
self = args[0]
msg = "Cannot call {}.{}() on node '{}' because {}".format(
self.__class__.__name__, f.__name__, self.name, "{}"
)
if not self.graph_built:
reason = "the graph has not been built yet."
raise RuntimeError(msg.format(reason))
return f(*args, **kargs)
return _graph_built
def generated(f):
assert callable(f)
@wraps(f)
def _generated(*args, **kargs):
self = args[0]
msg = "Cannot call {}.{}() on node '{}' because {}".format(
self.__class__.__name__, f.__name__, self.name, "{}"
)
if not self.generated:
reason = "this node has not been generated yet."
raise RuntimeError(msg.format(reason))
return f(*args, **kargs)
return _generated
else: # not __debug__
# python optimized, no checks
[docs]
def not_initialized(f):
return f
[docs]
def initialized(f):
return f
[docs]
def discretized(f):
return f
[docs]
def graph_built(f):
return f
[docs]
def generated(f):
return f
[docs]
def op_apply(f):
@debug
@profile
@ready
def apply(*args, **kwds):
dbg = "dbg" in kwds
dbg = dbg and (kwds["dbg"] is not None)
dbg = dbg and (kwds["dbg"].enable_on_op_apply)
debug_dump = "debug_dumper" in kwds
debug_dump = debug_dump and (kwds["debug_dumper"] is not None)
debug_dump = debug_dump and (kwds["debug_dumper"].enable_on_op_apply)
op = args[0]
if debug_dump:
assert "simulation" in kwds
simu = kwds["simulation"]
it = simu.current_iteration
t = simu.t()
_file = inspect.getsourcefile(f)
_, _line = inspect.getsourcelines(f)
description = f"{_file}:{_line}"
for param in sorted(op.input_params.keys(), key=lambda x: x.name):
tag = f"pre_{op.name}_{param.name}"
kwds["debug_dumper"](
it, t, tag, (param._value,), description=description
)
for dfield in sorted(
op.input_discrete_fields.values(), key=lambda x: x.name
):
tag = f"pre_{op.name}_{dfield.name}"
kwds["debug_dumper"](
it,
t,
tag,
tuple(
df.sdata.get().handle[df.compute_slices]
for df in dfield.dfields
),
description=description,
)
ret = f(*args, **kwds)
for param in sorted(op.output_params.keys(), key=lambda x: x.name):
tag = f"post_{op.name}_{param.name}"
kwds["debug_dumper"](
it, t, tag, (param._value,), description=description
)
for dfield in sorted(
op.output_discrete_fields.values(), key=lambda x: x.name
):
tag = f"post_{op.name}_{dfield.name}"
kwds["debug_dumper"](
it,
t,
tag,
tuple(
df.sdata.get().handle[df.compute_slices]
for df in dfield.dfields
),
description=description,
)
return ret
elif dbg:
msg = inspect.getsourcefile(f)
kwds["dbg"]("pre " + msg, nostack=True)
ret = f(*args, **kwds)
kwds["dbg"]("post " + msg, nostack=True)
return ret
else:
return f(*args, **kwds)
return ret
return apply